package com.zjl.SpringBoot.第09章_中间件.消息队列Kafka;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@RestController
public class kafkaController {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${spring.application.name}")
    String hostName;

    @ResponseBody
    @PostMapping("/kafka/add")
    public String add(@RequestBody Map<String, Object> body) throws ExecutionException, InterruptedException {

        StopWatch stopWatch = new StopWatch();//spring 提供的秒表功能
        stopWatch.start();//开始计时
        CompletableFuture<SendResult<String, Object>> kafkaBoot = kafkaTemplate.send("kafkaBoot", hostName, body.toString());

        stopWatch.stop();//计时结束
        long totalTimeMillis = stopWatch.getTotalTimeMillis();//使用了多长时间
        System.out.println("使用了【" + totalTimeMillis + "】毫秒");

        return kafkaBoot.get().toString();
    }

    //                 监听的主题
    @KafkaListener(topics = {"kafkaBoot"}, groupId = "kafkaGroup")
    public void topic_test1(List<Object> messages/*只获取value*/, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        for (Object message : messages) {
            System.out.print("\u001B[35m");
            System.out.print("kafkaGroup1 消费了 主题为【" + topic + "】的数据：\t" + message);
            System.out.println("\u001B[0m");
        }
    }

    //                 监听的主题                      不同的组不会争抢
    @KafkaListener(topics = {"kafkaBoot"}, groupId = "kafkaGroup2")
    public void topic_test2(ConsumerRecords<String, Object> records) {
        for (ConsumerRecord<String, Object> record : records) {
            System.out.print("\u001B[32m");
            System.out.print("kafkaGroup2 消费了 主题为【" + record.topic() + "】的数据：\t 其中key为【" + record.key() + "】\tvalue为【" + record.value() + "】");
            System.out.println("\u001B[0m");
        }
    }
    //                不同的组不会争抢
    @KafkaListener( groupId = "kafkaGroup3",
            topicPartitions = {
            @TopicPartition(topic = "kafkaBoot",//主题
                    partitionOffsets = {
                    //                       分区      指定获取的偏移量开始位置，强制指定，每次都从0开始
                    @PartitionOffset(partition = "0",initialOffset = "0")
            })
    })
    public void topic_test3(ConsumerRecords<String, Object> records) {
        for (ConsumerRecord<String, Object> record : records) {
            System.out.print("\u001B[31m");
            System.out.print("kafkaGroup3 消费了 主题为【" + record.topic() + "】的数据：\t 其中key为【" + record.key() + "】\tvalue为【" + record.value() + "】");
            System.out.println("\u001B[0m");
        }
    }
}
